/*
 * Copyright (C) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.github.pwittchen.reactivenetwork.library.rx2.network.observing.strategy;

import com.github.pwittchen.reactivenetwork.library.rx2.Connectivity;
import com.github.pwittchen.reactivenetwork.library.rx2.model.NetworkState;
import com.github.pwittchen.reactivenetwork.library.rx2.network.observing.NetworkObservingStrategy;
import com.github.pwittchen.reactivenetwork.library.rx2.utils.Log;

import com.jakewharton.nopen.annotation.Open;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;

import ohos.app.Context;

import ohos.event.commonevent.CommonEventData;
import ohos.event.commonevent.CommonEventManager;
import ohos.event.commonevent.CommonEventSubscribeInfo;
import ohos.event.commonevent.CommonEventSubscriber;
import ohos.event.commonevent.MatchingSkills;

import ohos.net.ConnectionProperties;
import ohos.net.NetStatusCallback;
import ohos.net.NetManager;
import ohos.net.NetHandle;
import ohos.net.NetSpecifier;
import ohos.net.NetCapabilities;

import ohos.powermanager.PowerManager;

import ohos.rpc.RemoteException;

import org.reactivestreams.Publisher;

import static com.github.pwittchen.reactivenetwork.library.rx2.ReactiveNetwork.LOG_TAG;

import static ohos.event.commonevent.CommonEventSupport.COMMON_EVENT_DEVICE_IDLE_MODE_CHANGED;

/**
 * Network observing strategy for devices with OHOS.
 * Uses Network Callback API and handles Doze mode.
 */
@Open
public class HarmonyNetworkObservingStrategy implements NetworkObservingStrategy {
    /**
     * ERROR_MSG_NETWORK_CALLBACK
     */
    protected static final String ERROR_MSG_NETWORK_CALLBACK = "could not unregister network callback";

    /**
     * ERROR_MSG_RECEIVER
     */
    protected static final String ERROR_MSG_RECEIVER = "could not unregister receiver";

    private final Subject<Connectivity> connectivitySubject;

    private final CommonEventSubscribeInfo idleReceiverInfo;

    @SuppressWarnings("NullAway") // it has to be initialized in the Observable due to Context
    private NetStatusCallback networkCallback;

    private CommonEventSubscriber idleReceiver;

    private Connectivity lastConnectivity = Connectivity.create();

    private NetworkState networkState = new NetworkState();

    /**
     * constructor
     */
    @SuppressWarnings("NullAway")
    public HarmonyNetworkObservingStrategy() {
        this.idleReceiverInfo = createIdleBroadcastReceiver();
        this.connectivitySubject = PublishSubject.<Connectivity>create().toSerialized();
    }

    @Override
    public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
        final NetManager manager = NetManager.getInstance(context);
        networkCallback = createNetworkCallback(context);

        registerIdleReceiver(context);

        final NetSpecifier request =
            new NetSpecifier.Builder()
                .addCapability(NetCapabilities.NET_CAPABILITY_INTERNET)
                .addCapability(NetCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
                .build();

        manager.addNetStatusCallback(request, networkCallback);

        return connectivitySubject
            .toFlowable(BackpressureStrategy.LATEST)
            .doOnCancel(
                () -> {
                    tryToUnregisterCallback(manager);
                    tryToUnregisterReceiver();
                })
            .doAfterNext(connectivity -> lastConnectivity = connectivity)
            .flatMap(
                (Function<Connectivity, Publisher<Connectivity>>)
                connectivity -> propagateAnyConnectedState(lastConnectivity, connectivity))
            .startWith(Flowable.just(Connectivity.create(context, networkState)))
            .distinctUntilChanged()
            .toObservable();
    }

    /**
     * propagateAnyConnectedState
     * @param last Connectivity
     * @param current Connectivity
     * @return Publisher<Connectivity>
     */
    protected Publisher<Connectivity> propagateAnyConnectedState(final Connectivity last, final Connectivity current) {
        final boolean typeChanged = last.getNetworkState().isConnected() != current.getNetworkState().isConnected();
        final boolean wasAvailable = last.available();
        final boolean isNotAvailable = !current.available();

        if (typeChanged && wasAvailable && isNotAvailable) {
            return Flowable.fromArray(current, last);
        } else {
            return Flowable.fromArray(current);
        }
    }

    /**
     * registerIdleReceiver
     * @param context Context
     */
    protected void registerIdleReceiver(final Context context) {
        if (idleReceiver == null) {
            idleReceiver =
                new CommonEventSubscriber(idleReceiverInfo) {
                    @Override
                    public void onReceiveEvent(CommonEventData eventData) {
                        if (isIdleMode()) {
                            onNext(Connectivity.create());
                        } else {
                            onNext(Connectivity.create(context, networkState));
                        }
                    }
                };
        }

        try {
            CommonEventManager.subscribeCommonEvent(idleReceiver);
        } catch (RemoteException exception) {
            onError(ERROR_MSG_RECEIVER, exception);
        }
    }

    @NonNull
    protected CommonEventSubscribeInfo createIdleBroadcastReceiver() {
        MatchingSkills filter = new MatchingSkills();
        filter.addEvent(COMMON_EVENT_DEVICE_IDLE_MODE_CHANGED);
        return new CommonEventSubscribeInfo(filter);
    }

    /**
     * isIdleMode
     * @return boolean
     */
    protected boolean isIdleMode() {
        final PowerManager manager = new PowerManager();
        return manager.getCurrentPowerStatusInfo().getPowerState() != PowerManager.PowerState.AWAKE;
    }

    /**
     * tryToUnregisterCallback
     * @param manager NetManager
     */
    protected void tryToUnregisterCallback(final NetManager manager) {
        boolean isRemoveSuccess = manager.removeNetStatusCallback(networkCallback);
        if (!isRemoveSuccess) {
            Exception exception = new Exception("Failed to remove NetStatusCallback");
            onError(ERROR_MSG_NETWORK_CALLBACK, exception);
        }
    }

    /**
     * tryToUnregisterReceiver
     */
    protected void tryToUnregisterReceiver() {
        try {
            CommonEventManager.unsubscribeCommonEvent(idleReceiver);
        } catch (RemoteException exception) {
            onError(ERROR_MSG_RECEIVER, exception);
        }
    }

    @Override
    public void onError(final String message, final Exception exception) {
        Log.error(LOG_TAG, "message = " + message + " exception = " + exception);
    }

    /**
     * createNetworkCallback
     * @param context Context
     * @return NetStatusCallback
     */
    protected NetStatusCallback createNetworkCallback(final Context context) {
        return new NetStatusCallback() {
            @Override
            public void onCapabilitiesChanged(
                @NonNull NetHandle network, @NonNull NetCapabilities networkCapabilities) {
                networkState.setNetHandle(network);
                networkState.setNetworkCapabilities(networkCapabilities);
                onNext(Connectivity.create(context, networkState));
            }

            @Override
            public void onConnectionPropertiesChanged(
                @NonNull NetHandle network, @NonNull ConnectionProperties linkProperties) {
                networkState.setNetHandle(network);
                networkState.setLinkProperties(linkProperties);
                onNext(Connectivity.create(context, networkState));
            }

            @Override
            public void onAvailable(NetHandle network) {
                networkState.setNetHandle(network);
                networkState.setConnected(true);
                onNext(Connectivity.create(context, networkState));
            }

            @Override
            public void onLost(NetHandle network) {
                networkState.setNetHandle(network);
                networkState.setConnected(false);
                onNext(Connectivity.create(context, networkState));
            }
        };
    }

    /**
     * onNext
     * @param connectivity Connectivity
     */
    protected void onNext(Connectivity connectivity) {
        connectivitySubject.onNext(connectivity);
    }
}
